永久免费看黄A片无码软件,japanese 在线观看国产,强奷高H猛烈失禁潮喷播放,亚洲成精品自拍

rexian

咨詢電話:023-6276-4481

熱門文章

聯(lián)系方式

電 話:023-6276-4481

郵箱:broiling@qq.com

地址:重慶市南岸區(qū)亞太商谷6幢25-2

當前位置:網(wǎng)站首頁 > 技術文章 > 緩存一致性和跨服務器查詢的數(shù)據(jù)異構(gòu)解決方案canal

緩存一致性和跨服務器查詢的數(shù)據(jù)異構(gòu)解決方案canal

編輯:kangkang 發(fā)表時間:2017-08-31 09:10:22
kangkang

 當你的項目數(shù)據(jù)量上去了之后,通常會遇到兩種情況,第一種情況應是最大可能的使用cache來對抗上層的高并發(fā),第二種情況同樣也是需要使用分庫

分表對抗上層的高并發(fā)。。。逼逼逼起來容易,做起來并不那么樂觀,由此引入的問題,不見得你有好的解決方案,下面就具體分享下。

 

一:盡可能的使用Cache

       比如在我們的千人千面系統(tǒng)中,會針對商品,訂單等維度為某一個商家店鋪自動化建立大約400個數(shù)據(jù)模型,然后買家在淘寶下訂單之后,淘寶會將訂單推

送過來,訂單會在400個模型中兜一圈,從而推送更貼切符合該買家行為習慣的短信和郵件,這是一個真實的業(yè)務場景,為了應對高并發(fā),這些模型自然都是緩

存在Cache中,模型都是從db中灌到redis的,那如果有新的模型進來了,我如何通知redis進行緩存更新呢???通常的做法就是在添加模型的時候,順便更新

redis。。。對吧,如下圖:214741-20170830220347577-1429929161.jpg

214741-20170830221242296-2089140243.jpg

上面這張圖,相信大家都能看得懂,重點就是這個處理binlog程序,從binlog中分析出CURD從而更新Redis,其實這個binlog程序就是本篇所說的canal。。。

一個偽裝成mysql的slave,不斷的通過dump命令從mysql中盜出binlog日志,從而完美的實現(xiàn)了這個需求。

 

二:數(shù)據(jù)異構(gòu)    

       本篇開頭也說到了,數(shù)據(jù)量大了之后,必然會存在分庫分表,甚至database都要分散到多臺服務器上,現(xiàn)在的電商項目,都是業(yè)務趕著技術跑。。。

誰也不知道下一個業(yè)務會是一個怎樣的奇葩,所以必然會導致你要做一些跨服務器join查詢,你以為自己很聰明,其實DBA早就把跨服務器查詢的函數(shù)給你

關掉了,求爹爹拜奶奶都不會給你開的,除非你殺一個DBA祭天,不過如果你的業(yè)務真的很重要,可能DBA會給你做數(shù)據(jù)異構(gòu),所謂的數(shù)據(jù)異構(gòu),那就是

將需要join查詢的多表按照某一個維度又聚合在一個DB中。讓你去查詢。。。。。

三:搭建一覽

     好了,canal的應用場景給大家也介紹到了,最主要是理解這種思想,人家搞不定的東西,你的價值就出來了。

 

1.  開啟mysql的binlog功能

        開啟binlog,并且將binlog的格式改為Row,這樣就可以獲取到CURD的二進制內(nèi)容,windows上的路徑為:C:\Program Files\MySQL\MySQL Server 5.7

\my.ini。

1 log-bin=mysql-bin #添加這一行就ok2 binlog-format=ROW #選擇row模式3 server_id=1

2. 驗證binlog是否開啟

       使用命令驗證,并且開啟binlog的過期時間為30天,默認情況下binlog是不過期的,這就導致你的磁盤可能會爆滿,直到掛掉。

1 show variables like 'log_%';2 3 #設置binlog的過期時間為30天4 show variables like '%expire_logs_days%';5 set global expire_logs_days=30;

 3. 給canal服務器分配一個mysql的賬號權限,方便canal去偷binlog日志。

CREATE USER canal IDENTIFIED BY 'canal';    
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';  
FLUSH PRIVILEGES;  

show grants for 'canal'

4. 下載canal

 github的地址: https://github.com/alibaba/canal/releases

5. 然后就是各種解壓的命令

復制代碼

[root@localhost ~]# ls
1                              Downloads                                mongodb-linux-x86_64-3.4.4      redis-3.2.9         Videos
anaconda-ks.cfg                dubbo                                    mongodb-linux-x86_64-3.4.4.tgz  redis-3.2.9.tar.gz  zookeeper.outapache-maven-3.5.0-bin.tar.gz  dubbo-monitor-simple-2.5.4-SNAPSHOT.jar  Music                           redis-4.0.1Desktop                        hadoop                                   Pictures                        software
Documents                      hadoop-3.0.0-alpha3.tar.gz               Public                          Templates
[root@localhost ~]# cd /usr/myapp
[root@localhost myapp]# ls
apache-maven-3.5.0-bin.tar.gz                        dubbo-monitor-simple-2.5.4-SNAPSHOT.jar     nginx                tengine-2.2.0.tar.gz
canal                                                gearmand                                    nginx-1.13.4.tar.gz  tengine_stcanal.deployer-1.0.24.tar.gz                         gearmand-1.1.17                             nginx_st             tomcat
dubbo                                                gearmand-1.1.17.tar.gz                      redis                zookeeper
dubbo-monitor-simple-2.5.4-SNAPSHOT                  maven                                       redis-4.0.1.tar.gz   zookeeper-3.4.9.tar.gz
dubbo-monitor-simple-2.5.4-SNAPSHOT-assembly.tar.gz  mysql-5.7.19-linux-glibc2.12-x86_64.tar.gz  tengine
[root@localhost myapp]# cd canal
[root@localhost canal]# ls
bin  conf  lib  logs
[root@localhost canal]# cd conf
[root@localhost conf]# ls
canal.properties  example  logback.xml  spring
[root@localhost conf]# cd example
[root@localhost example]# ls
instance.properties  meta.dat
[root@localhost example]#

復制代碼

 

6. canal 和 instance 配置文件

     canal的模式是這樣的,一個canal里面可能會有多個instance,也就說一個instance可以監(jiān)控一個mysql實例,多個instance也就可以對應多臺服務器

的mysql實例。也就是一個canal就可以監(jiān)控分庫分表下的多機器mysql。

 

《1》 canal.properties

      它是全局性的canal服務器配置,具體如下,這里面的參數(shù)涉及到方方面面。

復制代碼

#################################################
#########               common argument         ############# 
#################################################
canal.id= 1canal.ip=canal.port= 11111canal.zkServers=# flush data to zk
canal.zookeeper.flush.period = 1000# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
    
## detecing config
canal.instance.detecting.enable = false#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1canal.instance.detecting.interval.time = 3canal.instance.detecting.retry.threshold = 3canal.instance.detecting.heartbeatHaEnable = false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60# network config
canal.instance.network.receiveBufferSize = 16384canal.instance.network.sendBufferSize = 16384canal.instance.network.soTimeout = 30# binlog filter config
canal.instance.filter.query.dcl = falsecanal.instance.filter.query.dml = falsecanal.instance.filter.query.ddl = falsecanal.instance.filter.table.error = falsecanal.instance.filter.rows = false# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false#################################################
#########               destinations            ############# 
#################################################canal.destinations= example# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = truecanal.auto.scan.interval = 5canal.instance.global.mode = spring 
canal.instance.global.lazy = false#canal.instance.global.manager.address = 127.0.0.1:1099#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

#################################################  
## mysql serverId  
canal.instance.mysql.slaveId = 1234  # position info,需要改成自己的數(shù)據(jù)庫信息  
canal.instance.master.address = 127.0.0.1:3306   canal.instance.master.journal.name =canal.instance.master.position =canal.instance.master.timestamp =#canal.instance.standby.address =   #canal.instance.standby.journal.name =  #canal.instance.standby.position =   #canal.instance.standby.timestamp =   # username/password,需要改成自己的數(shù)據(jù)庫信息  
canal.instance.dbUsername = root
canal.instance.dbPassword = 123456canal.instance.defaultDatabaseName = datamip  
canal.instance.connectionCharset = UTF-8  # table regex  
canal.instance.filter.regex = .*\\..*  #################################################

復制代碼

   

    由于是全局性的配置,所以上面三處標紅的地方要注意一下:

canal.port= 11111                 當前canal的服務器端口號

canal.destinations= example      當前默認開啟了一個名為example的instance實例,如果想開多個instance,用","逗號隔開就可以了。。。

canal.instance.filter.regex = .*\\..*    mysql實例下的所有db的所有表都在監(jiān)控范圍內(nèi)。

 

《2》 instance.properties

      這個就是具體的某個instances實例的配置,未涉及到的配置都會從canal.properties上繼承。

復制代碼

#################################################
## mysql serverId
canal.instance.mysql.slaveId = 1234# position infocanal.instance.master.address = 192.168.23.1:3306canal.instance.master.journal.name =canal.instance.master.position =canal.instance.master.timestamp =#canal.instance.standby.address = #canal.instance.standby.journal.name =#canal.instance.standby.position = #canal.instance.standby.timestamp = # username/passwordcanal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =datamipcanal.instance.connectionCharset = UTF-8# table regexcanal.instance.filter.regex = .*\\..*# table black regex
canal.instance.filter.black.regex =#################################################

復制代碼

 

    上面標紅的地方注意下就好了,去偷binlog的時候,需要知道的mysql地址和用戶名,密碼。

 

7. 開啟canal

      大家要記得把/canal/bin 目錄配置到 /etc/profile 的 Path中,方便快速開啟,通過下圖你會看到11111端口已經(jīng)在centos上開啟了。

復制代碼

[root@localhost bin]# ls
canal.pid  startup.bat  startup.sh  stop.sh
[root@localhost bin]# pwd
/usr/myapp/canal/bin
[root@localhost example]# startup.sh
cd to /usr/myapp/canal/bin for workaround relative path
LOG CONFIGURATION : /usr/myapp/canal/bin/../conf/logback.xml
canal conf : /usr/myapp/canal/bin/../conf/canal.properties
CLASSPATH :/usr/myapp/canal/bin/../conf:/usr/myapp/canal/bin/../lib/zookeeper-3.4.5.jar:/usr/myapp/canal/bin/../lib/zkclient-0.1.jar:/usr/myapp/canal/bin/../lib/spring-2.5.6.jar:/usr/myapp/canal/bin/../lib/slf4j-api-1.7.12.jar:/usr/myapp/canal/bin/../lib/protobuf-java-2.6.1.jar:/usr/myapp/canal/bin/../lib/oro-2.0.8.jar:/usr/myapp/canal/bin/../lib/netty-all-4.1.6.Final.jar:/usr/myapp/canal/bin/../lib/netty-3.2.5.Final.jar:/usr/myapp/canal/bin/../lib/logback-core-1.1.3.jar:/usr/myapp/canal/bin/../lib/logback-classic-1.1.3.jar:/usr/myapp/canal/bin/../lib/log4j-1.2.14.jar:/usr/myapp/canal/bin/../lib/jcl-over-slf4j-1.7.12.jar:/usr/myapp/canal/bin/../lib/guava-18.0.jar:/usr/myapp/canal/bin/../lib/fastjson-1.2.28.jar:/usr/myapp/canal/bin/../lib/commons-logging-1.1.1.jar:/usr/myapp/canal/bin/../lib/commons-lang-2.6.jar:/usr/myapp/canal/bin/../lib/commons-io-2.4.jar:/usr/myapp/canal/bin/../lib/commons-beanutils-1.8.2.jar:/usr/myapp/canal/bin/../lib/canal.store-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.sink-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.server-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.protocol-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.parse.driver-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.parse.dbsync-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.parse-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.meta-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.instance.spring-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.instance.manager-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.instance.core-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.filter-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.deployer-1.0.24.jar:/usr/myapp/canal/bin/../lib/canal.common-1.0.24.jar:/usr/myapp/canal/bin/../lib/aviator-2.2.1.jar:
cd to /usr/myapp/canal/conf/example for continue[root@localhost example]# netstat -tln
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State      
tcp        0      0 0.0.0.0:11111           0.0.0.0:*               LISTEN     tcp        0      0 0.0.0.0:111             0.0.0.0:*               LISTEN     
tcp        0      0 192.168.122.1:53        0.0.0.0:*               LISTEN     
tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN     
tcp        0      0 127.0.0.1:631           0.0.0.0:*               LISTEN     
tcp        0      0 127.0.0.1:25            0.0.0.0:*               LISTEN     
tcp6       0      0 :::111                  :::*                    LISTEN     
tcp6       0      0 :::22                   :::*                    LISTEN     
tcp6       0      0 ::1:631                 :::*                    LISTEN     
tcp6       0      0 ::1:25                  :::*                    LISTEN     
[root@localhost example]#

復制代碼

 

8. Java Client 代碼

   canal driver 需要在maven倉庫中獲取一下:http://www.mvnrepository.com/artifact/com.alibaba.otter/canal.client/1.0.24,不過依賴還是蠻多的。

        <!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client -->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.0.24</version>
        </dependency>

214741-20170830231949452-1721463308.jpg

9. 啟動java代碼進行驗證

      下面的代碼對table的CURD都做了一個基本的判斷,看看是不是能夠智能感知,然后可以根據(jù)實際情況進行redis的更新操作。。。

復制代碼

package com.datamip.canal;import java.awt.Event;import java.net.InetSocketAddress;import java.util.List;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.protocol.CanalEntry.Column;import com.alibaba.otter.canal.protocol.CanalEntry.Entry;import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;import com.alibaba.otter.canal.protocol.CanalEntry.EventType;import com.alibaba.otter.canal.protocol.CanalEntry.Header;import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;import com.alibaba.otter.canal.protocol.Message;import com.google.protobuf.InvalidProtocolBufferException;public class App {    public static void main(String[] args) throws InterruptedException {        // 第一步:與canal進行連接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.23.170", 11111),                "example", "", "");
        connector.connect();        // 第二步:開啟訂閱        connector.subscribe();        // 第三步:循環(huán)訂閱
        while (true) {            try {                // 每次讀取 1000 條
                Message message = connector.getWithoutAck(1000);                long batchID = message.getId();                int size = message.getEntries().size();                if (batchID == -1 || size == 0) {
                    System.out.println("當前暫時沒有數(shù)據(jù)");
                    Thread.sleep(1000); // 沒有數(shù)據(jù)
                } else {
                    System.out.println("-------------------------- 有數(shù)據(jù)啦 -----------------------");
                    PrintEntry(message.getEntries());
                }                // position id ack (方便處理下一條)                connector.ack(batchID);

            } catch (Exception e) {                // TODO: handle exception
            } finally {
                Thread.sleep(1000);
            }
        }
    }    // 獲取每條打印的記錄
    @SuppressWarnings("static-access")    public static void PrintEntry(List<Entry> entrys) {        for (Entry entry : entrys) {            // 第一步:拆解entry 實體
            Header header = entry.getHeader();
            EntryType entryType = entry.getEntryType();            // 第二步: 如果當前是RowData,那就是我需要的數(shù)據(jù)
            if (entryType == EntryType.ROWDATA) {

                String tableName = header.getTableName();
                String schemaName = header.getSchemaName();

                RowChange rowChange = null;                try {
                    rowChange = RowChange.parseFrom(entry.getStoreValue());
                } catch (InvalidProtocolBufferException e) {
                    e.printStackTrace();
                }

                EventType eventType = rowChange.getEventType();

                System.out.println(String.format("當前正在操作 %s.%s, Action= %s", schemaName, tableName, eventType));                // 如果是‘查詢’ 或者 是 ‘DDL’ 操作,那么sql直接打出來
                if (eventType == EventType.QUERY || rowChange.getIsDdl()) {
                    System.out.println("rowchange sql ----->" + rowChange.getSql());                    return;
                }                // 第三步:追蹤到 columns 級別
                rowChange.getRowDatasList().forEach((rowData) -> {                    // 獲取更新之前的column情況
                    List<Column> beforeColumns = rowData.getBeforeColumnsList();                    // 獲取更新之后的 column 情況
                    List<Column> afterColumns = rowData.getAfterColumnsList();                    // 當前執(zhí)行的是 刪除操作
                    if (eventType == EventType.DELETE) {
                        PrintColumn(beforeColumns);
                    }                    // 當前執(zhí)行的是 插入操作
                    if (eventType == eventType.INSERT) {
                        PrintColumn(afterColumns);
                    }                    // 當前執(zhí)行的是 更新操作
                    if (eventType == eventType.UPDATE) {
                        PrintColumn(afterColumns);
           &n